package org.apache.solr.handler.dataimport;

import static org.apache.solr.handler.dataimport.DataImporter.IMPORT_CMD;

import java.lang.invoke.MethodHandles;
import java.lang.reflect.Constructor;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.RawResponseWriter;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * {@link DataImportHandler}包装，只为在在处理完成时，可以得到处理处理的数据（在{@link RecordableDocBuilder}
 * 中实现）。所有功能均相同，只是在{@link #inform}方法中初始化 {@link #importer}变量时，改为使用
 * {@link RecordableDataImport}类.
 * 
 * @author Gex
 *
 */
public class RecordableDataImportHandler extends RequestHandlerBase implements
SolrCoreAware {

	private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

	  private DataImporter importer;

	  private boolean debugEnabled = true;

	  private String myName = "dataimport";

	  private static final String PARAM_WRITER_IMPL = "writerImpl";
	  private static final String DEFAULT_WRITER_NAME = "SolrWriter";

	  public DataImporter getImporter() {
	    return this.importer;
	  }

	  @Override
	  @SuppressWarnings({ "rawtypes" })
	  public void init(NamedList args) {
	    super.init(args);
	    Map<String,String> macro = new HashMap<>();
	    macro.put("expandMacros", "false");
	    defaults = SolrParams.wrapDefaults(defaults, new MapSolrParams(macro));
	  }

	  @Override
	  public void inform(SolrCore core) {
	    try {
	      String name = getPluginInfo().name;
	      if (name.startsWith("/")) {
	        myName = name.substring(1);
	      }
	      // some users may have '/' in the handler name. replace with '_'
	      myName = myName.replaceAll("/", "_");
	      debugEnabled = StrUtils.parseBool((String)initArgs.get(ENABLE_DEBUG), true);
	      importer = new RecordableDataImport(core, myName);         
	    } catch (Exception e) {
	      LOG.error( DataImporter.MSG.LOAD_EXP, e);
	      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, DataImporter.MSG.LOAD_EXP, e);
	    }
	  }

	  @SuppressWarnings("rawtypes")
	@Override
	  public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)
	          throws Exception {
	    rsp.setHttpCaching(false);
	    
	    //TODO: figure out why just the first one is OK...
	    ContentStream contentStream = null;
	    Iterable<ContentStream> streams = req.getContentStreams();
	    if(streams != null){
	      for (ContentStream stream : streams) {
	          contentStream = stream;
	          break;
	      }
	    }
	    SolrParams params = req.getParams();
	    NamedList defaultParams = (NamedList) initArgs.get("defaults");
	    RequestInfo requestParams = new RequestInfo(req, getParamsMap(params), contentStream);
	    String command = requestParams.getCommand();
	    
	    if (DataImporter.SHOW_CONF_CMD.equals(command)) {    
	      String dataConfigFile = params.get("config");
	      String dataConfig = params.get("dataConfig");
	      if(dataConfigFile != null) {
	        dataConfig = SolrWriter.getResourceAsString(req.getCore().getResourceLoader().openResource(dataConfigFile));
	      }
	      if(dataConfig==null)  {
	        rsp.add("status", DataImporter.MSG.NO_CONFIG_FOUND);
	      } else {
	        // Modify incoming request params to add wt=raw
	        ModifiableSolrParams rawParams = new ModifiableSolrParams(req.getParams());
	        rawParams.set(CommonParams.WT, "raw");
	        req.setParams(rawParams);
	        ContentStreamBase content = new ContentStreamBase.StringStream(dataConfig);
	        rsp.add(RawResponseWriter.CONTENT, content);
	      }
	      return;
	    }

	    rsp.add("initArgs", initArgs);
	    String message = "";

	    if (command != null) {
	      rsp.add("command", command);
	    }
	    // If importer is still null
	    if (importer == null) {
	      rsp.add("status", DataImporter.MSG.NO_INIT);
	      return;
	    }

	    if (command != null && DataImporter.ABORT_CMD.equals(command)) {
	      importer.runCmd(requestParams, null);
	    } else if (importer.isBusy()) {
	      message = DataImporter.MSG.CMD_RUNNING;
	    } else if (command != null) {
	      if (DataImporter.FULL_IMPORT_CMD.equals(command)
	              || DataImporter.DELTA_IMPORT_CMD.equals(command) ||
	              IMPORT_CMD.equals(command)) {
	        importer.maybeReloadConfiguration(requestParams, defaultParams);
	        UpdateRequestProcessorChain processorChain =
	                req.getCore().getUpdateProcessorChain(params);
	        UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
	        SolrResourceLoader loader = req.getCore().getResourceLoader();
	        DIHWriter sw = getSolrWriter(processor, loader, requestParams, req);
	        
	        if (requestParams.isDebug()) {
	          if (debugEnabled) {
	            // Synchronous request for the debug mode
	            importer.runCmd(requestParams, sw);
	            rsp.add("mode", "debug");
	            rsp.add("documents", requestParams.getDebugInfo().debugDocuments);
	            if (requestParams.getDebugInfo().debugVerboseOutput != null) {
	              rsp.add("verbose-output", requestParams.getDebugInfo().debugVerboseOutput);
	            }
	          } else {
	            message = DataImporter.MSG.DEBUG_NOT_ENABLED;
	          }
	        } else {
	          // Asynchronous request for normal mode
	          if(requestParams.getContentStream() == null && !requestParams.isSyncMode()){
	            importer.runAsync(requestParams, sw);
	          } else {
	            importer.runCmd(requestParams, sw);
	          }
	        }
	      } else if (DataImporter.RELOAD_CONF_CMD.equals(command)) { 
	        if(importer.maybeReloadConfiguration(requestParams, defaultParams)) {
	          message = DataImporter.MSG.CONFIG_RELOADED;
	        } else {
	          message = DataImporter.MSG.CONFIG_NOT_RELOADED;
	        }
	      }
	    }
	    rsp.add("status", importer.isBusy() ? "busy" : "idle");
	    rsp.add("importResponse", message);
	    rsp.add("statusMessages", importer.getStatusMessages());
	  }

	  private Map<String, Object> getParamsMap(SolrParams params) {
	    Iterator<String> names = params.getParameterNamesIterator();
	    Map<String, Object> result = new HashMap<>();
	    while (names.hasNext()) {
	      String s = names.next();
	      String[] val = params.getParams(s);
	      if (val == null || val.length < 1)
	        continue;
	      if (val.length == 1)
	        result.put(s, val[0]);
	      else
	        result.put(s, Arrays.asList(val));
	    }
	    return result;
	  }

	  private DIHWriter getSolrWriter(final UpdateRequestProcessor processor,
	      final SolrResourceLoader loader, final RequestInfo requestParams,
	      SolrQueryRequest req) {
	    SolrParams reqParams = req.getParams();
	    String writerClassStr = null;
	    if (reqParams != null && reqParams.get(PARAM_WRITER_IMPL) != null) {
	      writerClassStr = (String) reqParams.get(PARAM_WRITER_IMPL);
	    }
	    @SuppressWarnings("unused")
		DIHWriter writer;
	    if (writerClassStr != null
	        && !writerClassStr.equals(DEFAULT_WRITER_NAME)
	        && !writerClassStr.equals(DocBuilder.class.getPackage().getName() + "."
	            + DEFAULT_WRITER_NAME)) {
	      try {
	        @SuppressWarnings("unchecked")
	        Class<DIHWriter> writerClass = DocBuilder.loadClass(writerClassStr, req.getCore());
	        Constructor<DIHWriter> cnstr = writerClass.getConstructor(new Class[] {
	            UpdateRequestProcessor.class, SolrQueryRequest.class});
	        return cnstr.newInstance((Object) processor, (Object) req);
	      } catch (Exception e) {
	        throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
	            "Unable to load Writer implementation:" + writerClassStr, e);
	      }
	    } else {
	      return new SolrWriter(processor, req) {
	        @Override
	        public boolean upload(SolrInputDocument document) {
	          try {
	            return super.upload(document);
	          } catch (RuntimeException e) {
	            LOG.error("Exception while adding: " + document, e);
	            return false;
	          }
	        }
	      };
	    }
	  }
	  
	  @Override
	  @SuppressWarnings({ "unchecked", "rawtypes" })
	  public NamedList getStatistics() {
	    if (importer == null)
	      return super.getStatistics();

	    DocBuilder.Statistics cumulative = importer.cumulativeStatistics;
	    SimpleOrderedMap result = new SimpleOrderedMap();

	    result.add("Status", importer.getStatus().toString());

	    if (importer.docBuilder != null) {
	      DocBuilder.Statistics running = importer.docBuilder.importStatistics;
	      result.add("Documents Processed", running.docCount);
	      result.add("Requests made to DataSource", running.queryCount);
	      result.add("Rows Fetched", running.rowsCount);
	      result.add("Documents Deleted", running.deletedDocCount);
	      result.add("Documents Skipped", running.skipDocCount);
	    }

	    result.add(DataImporter.MSG.TOTAL_DOC_PROCESSED, cumulative.docCount);
	    result.add(DataImporter.MSG.TOTAL_QUERIES_EXECUTED, cumulative.queryCount);
	    result.add(DataImporter.MSG.TOTAL_ROWS_EXECUTED, cumulative.rowsCount);
	    result.add(DataImporter.MSG.TOTAL_DOCS_DELETED, cumulative.deletedDocCount);
	    result.add(DataImporter.MSG.TOTAL_DOCS_SKIPPED, cumulative.skipDocCount);

	    NamedList requestStatistics = super.getStatistics();
	    if (requestStatistics != null) {
	      for (int i = 0; i < requestStatistics.size(); i++) {
	        result.add(requestStatistics.getName(i), requestStatistics.getVal(i));
	      }
	    }

	    return result;
	  }

	  // //////////////////////SolrInfoMBeans methods //////////////////////

	  @Override
	  public String getDescription() {
	    return DataImporter.MSG.JMX_DESC;
	  }

	  public static final String ENABLE_DEBUG = "enableDebug";

}
